{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 异步计算\n",
    "\n",
    "MXNet使用异步计算来提升计算性能。理解它的工作原理既有助于开发更高效的程序，又有助于在内存资源有限的情况下主动降低计算性能从而减小内存开销。我们先导入本节中实验需要的包或模块。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "attributes": {
     "classes": [],
     "id": "",
     "n": "1"
    }
   },
   "outputs": [],
   "source": [
    "from mxnet import autograd, gluon, nd\n",
    "from mxnet.gluon import loss as gloss, nn\n",
    "import os\n",
    "import subprocess\n",
    "import time"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## MXNet中的异步计算\n",
    "\n",
    "广义上讲，MXNet包括用户直接用来交互的前端和系统用来执行计算的后端。例如，用户可以使用不同的前端语言编写MXNet程序，如Python、R、Scala和C++。无论使用何种前端编程语言，MXNet程序的执行主要都发生在C++实现的后端。换句话说，用户写好的前端MXNet程序会传给后端执行计算。后端有自己的线程在队列中不断收集任务并执行它们。\n",
    "\n",
    "MXNet通过前端线程和后端线程的交互实现异步计算。异步计算指，前端线程无须等待当前指令从后端线程返回结果就继续执行后面的指令。为了便于解释，假设Python前端线程调用以下4条指令。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "attributes": {
     "classes": [],
     "id": "",
     "n": "3"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\n",
       "[[3. 3.]]\n",
       "<NDArray 1x2 @cpu(0)>"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "a = nd.ones((1, 2))\n",
    "b = nd.ones((1, 2))\n",
    "c = a * b + 2\n",
    "c"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在异步计算中，Python前端线程执行前3条语句的时候，仅仅是把任务放进后端的队列里就返回了。当最后一条语句需要打印计算结果时，Python前端线程会等待C++后端线程把变量`c`的结果计算完。此设计的一个好处是，这里的Python前端线程不需要做实际计算。因此，无论Python的性能如何，它对整个程序性能的影响很小。只要C++后端足够高效，那么不管前端编程语言性能如何，MXNet都可以提供一致的高性能。\n",
    "\n",
    "为了演示异步计算的性能，我们先实现一个简单的计时类。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "class Benchmark():  # 本类已保存在d2lzh包中方便以后使用\n",
    "    def __init__(self, prefix=None):\n",
    "        self.prefix = prefix + ' ' if prefix else ''\n",
    "\n",
    "    def __enter__(self):\n",
    "        self.start = time.time()\n",
    "\n",
    "    def __exit__(self, *args):\n",
    "        print('%stime: %.4f sec' % (self.prefix, time.time() - self.start))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "下面的例子通过计时来展示异步计算的效果。可以看到，当`y = nd.dot(x, x).sum()`返回的时候并没有等待变量`y`真正被计算完。只有当`print`函数需要打印变量`y`时才必须等待它计算完。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "attributes": {
     "classes": [],
     "id": "",
     "n": "4"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Workloads are queued. time: 0.0006 sec\n",
      "sum = \n",
      "[2.0003661e+09]\n",
      "<NDArray 1 @cpu(0)>\n",
      "Workloads are finished. time: 0.1148 sec\n"
     ]
    }
   ],
   "source": [
    "with Benchmark('Workloads are queued.'):\n",
    "    x = nd.random.uniform(shape=(2000, 2000))\n",
    "    y = nd.dot(x, x).sum()\n",
    "\n",
    "with Benchmark('Workloads are finished.'):\n",
    "    print('sum =', y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "的确，除非我们需要打印或者保存计算结果，否则我们基本无须关心目前结果在内存中是否已经计算好了。只要数据是保存在`NDArray`里并使用MXNet提供的运算符，MXNet将默认使用异步计算来获取高计算性能。\n",
    "\n",
    "\n",
    "## 用同步函数让前端等待计算结果\n",
    "\n",
    "除了刚刚介绍的`print`函数外，我们还有其他方法让前端线程等待后端的计算结果完成。我们可以使用`wait_to_read`函数让前端等待某个的`NDArray`的计算结果完成，再执行前端中后面的语句。或者，我们可以用`waitall`函数令前端等待前面所有计算结果完成。后者是性能测试中常用的方法。\n",
    "\n",
    "下面是使用`wait_to_read`函数的例子。输出用时包含了变量`y`的计算时间。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "attributes": {
     "classes": [],
     "id": "",
     "n": "5"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "time: 0.0678 sec\n"
     ]
    }
   ],
   "source": [
    "with Benchmark():\n",
    "    y = nd.dot(x, x)\n",
    "    y.wait_to_read()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "下面是使用`waitall`函数的例子。输出用时包含了变量`y`和变量`z`的计算时间。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "attributes": {
     "classes": [],
     "id": "",
     "n": "6"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "time: 0.1314 sec\n"
     ]
    }
   ],
   "source": [
    "with Benchmark():\n",
    "    y = nd.dot(x, x)\n",
    "    z = nd.dot(x, x)\n",
    "    nd.waitall()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "此外，任何将`NDArray`转换成其他不支持异步计算的数据结构的操作都会让前端等待计算结果。例如，当我们调用`asnumpy`函数和`asscalar`函数时："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "attributes": {
     "classes": [],
     "id": "",
     "n": "7"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "time: 0.0698 sec\n"
     ]
    }
   ],
   "source": [
    "with Benchmark():\n",
    "    y = nd.dot(x, x)\n",
    "    y.asnumpy()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "attributes": {
     "classes": [],
     "id": "",
     "n": "8"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "time: 0.1085 sec\n"
     ]
    }
   ],
   "source": [
    "with Benchmark():\n",
    "    y = nd.dot(x, x)\n",
    "    y.norm().asscalar()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "上面介绍的`wait_to_read`函数、`waitall`函数、`asnumpy`函数、`asscalar`函数和`print`函数会触发让前端等待后端计算结果的行为。这类函数通常称为同步函数。\n",
    "\n",
    "\n",
    "## 使用异步计算提升计算性能\n",
    "\n",
    "在下面的例子中，我们用`for`循环不断对变量`y`赋值。当在`for`循环内使用同步函数`wait_to_read`时，每次赋值不使用异步计算；当在`for`循环外使用同步函数`waitall`时，则使用异步计算。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "attributes": {
     "classes": [],
     "id": "",
     "n": "9"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "synchronous. time: 0.6014 sec\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "asynchronous. time: 0.3912 sec\n"
     ]
    }
   ],
   "source": [
    "with Benchmark('synchronous.'):\n",
    "    for _ in range(1000):\n",
    "        y = x + 1\n",
    "        y.wait_to_read()\n",
    "\n",
    "with Benchmark('asynchronous.'):\n",
    "    for _ in range(1000):\n",
    "        y = x + 1\n",
    "    nd.waitall()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "我们观察到，使用异步计算能提升一定的计算性能。为了解释这一现象，让我们对Python前端线程和C++后端线程的交互稍作简化。在每一次循环中，前端和后端的交互大约可以分为3个阶段：\n",
    "\n",
    "1. 前端令后端将计算任务`y = x + 1`放进队列；\n",
    "1. 后端从队列中获取计算任务并执行真正的计算；\n",
    "1. 后端将计算结果返回给前端。\n",
    "\n",
    "我们将这3个阶段的耗时分别设为$t_1, t_2, t_3$。如果不使用异步计算，执行1000次计算的总耗时大约为$1000 (t_1+ t_2 + t_3)$；如果使用异步计算，由于每次循环中前端都无须等待后端返回计算结果，执行1000次计算的总耗时可以降为$t_1 + 1000 t_2 + t_3$（假设$1000t_2 > 999t_1$）。\n",
    "\n",
    "## 异步计算对内存的影响\n",
    "\n",
    "为了解释异步计算对内存使用的影响，让我们先回忆一下前面章节的内容。在前面章节中实现的模型训练过程中，我们通常会在每个小批量上评测一下模型，如模型的损失或者精度。细心的读者也许已经发现了，这类评测常用到同步函数，如`asscalar`函数或者`asnumpy`函数。如果去掉这些同步函数，前端会将大量的小批量计算任务在极短的时间内丢给后端，从而可能导致占用更多内存。当我们在每个小批量上都使用同步函数时，前端在每次迭代时仅会将一个小批量的任务丢给后端执行计算，并通常会减小内存占用。\n",
    "\n",
    "由于深度学习模型通常比较大，而内存资源通常有限，建议大家在训练模型时对每个小批量都使用同步函数，例如，用`asscalar`函数或者`asnumpy`函数评价模型的表现。类似地，在使用模型预测时，为了减小内存的占用，也建议大家对每个小批量预测时都使用同步函数，例如，直接打印出当前小批量的预测结果。\n",
    "\n",
    "下面我们来演示异步计算对内存的影响。我们先定义一个数据获取函数`data_iter`，它会从被调用时开始计时，并定期打印到目前为止获取数据批量的总耗时。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "attributes": {
     "classes": [],
     "id": "",
     "n": "11"
    }
   },
   "outputs": [],
   "source": [
    "def data_iter():\n",
    "    start = time.time()\n",
    "    num_batches, batch_size = 100, 1024\n",
    "    for i in range(num_batches):\n",
    "        X = nd.random.normal(shape=(batch_size, 512))\n",
    "        y = nd.ones((batch_size,))\n",
    "        yield X, y\n",
    "        if (i + 1) % 50 == 0:\n",
    "            print('batch %d, time %f sec' % (i + 1, time.time() - start))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "下面定义多层感知机、优化算法和损失函数。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "attributes": {
     "classes": [],
     "id": "",
     "n": "12"
    }
   },
   "outputs": [],
   "source": [
    "net = nn.Sequential()\n",
    "net.add(nn.Dense(2048, activation='relu'),\n",
    "        nn.Dense(512, activation='relu'),\n",
    "        nn.Dense(1))\n",
    "net.initialize()\n",
    "trainer = gluon.Trainer(net.collect_params(), 'sgd', {'learning_rate': 0.005})\n",
    "loss = gloss.L2Loss()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "这里定义辅助函数来监测内存的使用。需要注意的是，这个函数只能在Linux或macOS上运行。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "attributes": {
     "classes": [],
     "id": "",
     "n": "13"
    }
   },
   "outputs": [],
   "source": [
    "def get_mem():\n",
    "    res = subprocess.check_output(['ps', 'u', '-p', str(os.getpid())])\n",
    "    return int(str(res).split()[15]) / 1e3"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "现在我们可以做测试了。我们先试运行一次，让系统把`net`的参数初始化。有关初始化的讨论可参见[“模型参数的延后初始化”](../chapter_deep-learning-computation/deferred-init.ipynb)一节。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "attributes": {
     "classes": [],
     "id": "",
     "n": "14"
    }
   },
   "outputs": [],
   "source": [
    "for X, y in data_iter():\n",
    "    break\n",
    "loss(y, net(X)).wait_to_read()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "对于训练模型`net`来说，我们可以自然地使用同步函数`asscalar`将每个小批量的损失从`NDArray`格式中取出，并打印每个迭代周期后的模型损失。此时，每个小批量的生成间隔较长，不过内存开销较小。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "attributes": {
     "classes": [],
     "id": "",
     "n": "17"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "batch 50, time 3.752532 sec\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "batch 100, time 7.542264 sec\n",
      "increased memory: 2.920000 MB\n"
     ]
    }
   ],
   "source": [
    "l_sum, mem = 0, get_mem()\n",
    "for X, y in data_iter():\n",
    "    with autograd.record():\n",
    "        l = loss(y, net(X))\n",
    "    l_sum += l.mean().asscalar()  # 使用同步函数asscalar\n",
    "    l.backward()\n",
    "    trainer.step(X.shape[0])\n",
    "nd.waitall()\n",
    "print('increased memory: %f MB' % (get_mem() - mem))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "如果去掉同步函数，虽然每个小批量的生成间隔较短，但训练过程中可能会导致内存占用较高。这是因为在默认异步计算下，前端会将所有小批量计算在短时间内全部丢给后端。这可能在内存积压大量中间结果无法释放。实验中我们看到，不到一秒，所有数据（`X`和`y`）就都已经产生。但因为训练速度没有跟上，所以这些数据只能放在内存里不能及时清除，从而占用额外内存。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "attributes": {
     "classes": [],
     "id": "",
     "n": "18"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "batch 50, time 0.075835 sec\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "batch 100, time 0.151360 sec\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "increased memory: 2.820000 MB\n"
     ]
    }
   ],
   "source": [
    "mem = get_mem()\n",
    "for X, y in data_iter():\n",
    "    with autograd.record():\n",
    "        l = loss(y, net(X))\n",
    "    l.backward()\n",
    "    trainer.step(X.shape[0])\n",
    "nd.waitall()\n",
    "print('increased memory: %f MB' % (get_mem() - mem))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 小结\n",
    "\n",
    "* MXNet包括用户直接用来交互的前端和系统用来执行计算的后端。\n",
    "\n",
    "* MXNet能够通过异步计算提升计算性能。\n",
    "\n",
    "* 建议使用每个小批量训练或预测时至少使用一个同步函数，从而避免在短时间内将过多计算任务丢给后端。\n",
    "\n",
    "\n",
    "## 练习\n",
    "\n",
    "* 在“使用异步计算提升计算性能”一节中，我们提到使用异步计算可以使执行1000次计算的总耗时降为$t_1 + 1000 t_2 + t_3$。这里为什么要假设$1000t_2 > 999t_1$？\n",
    "\n",
    "\n",
    "\n",
    "\n",
    "## 扫码直达[讨论区](https://discuss.gluon.ai/t/topic/1881)\n",
    "\n",
    "![](../img/qr_async-computation.svg)"
   ]
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}